/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.regionserver;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated with every chunk
 */
@InterfaceAudience.Private
public class ChunkCreator {

    private static final Logger LOG = LoggerFactory.getLogger(ChunkCreator.class);

    // monotonically increasing chunkid. Starts at 1.
    private AtomicInteger chunkID = new AtomicInteger(1);
    // maps the chunk against the monotonically increasing chunk id. We need to preserve the natural ordering of the key
    // CellChunkMap creation should convert the weak ref to hard reference

    // chunk id of each chunk is the first integer written on each chunk,
    // the header size need to be changed in case chunk id size is changed
    public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;

    /**
     * Types of chunks, based on their sizes
     */
    public enum ChunkType {
        // An index chunk is a small chunk, allocated from the index chunks pool.
        // Its size is fixed and is 10% of the size of a data chunk.
        INDEX_CHUNK, // A data chunk is a regular chunk, allocated from the data chunks pool.
        // Its size is fixed and given as input to the ChunkCreator c'tor.
        DATA_CHUNK, // A jumbo chunk isn't allocated from pool. Its size is bigger than the size of a
        // data chunk, and is determined per chunk (meaning, there is no fixed jumbo size).
        JUMBO_CHUNK
    }

    // mapping from chunk IDs to chunks
    private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>();

    private final boolean offheap;

    @VisibleForTesting
    static ChunkCreator instance;

    @VisibleForTesting
    static boolean chunkPoolDisabled = false;
    private MemStoreChunkPool dataChunksPool;
    private int chunkSize;
    private MemStoreChunkPool indexChunksPool;

    @VisibleForTesting
    ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage, float initialCountPercentage,
            HeapMemoryManager heapMemoryManager, float indexChunkSizePercentage) {
        this.offheap = offheap;
        this.chunkSize = chunkSize; // in case pools are not allocated
        initializePools(chunkSize, globalMemStoreSize, poolSizePercentage, indexChunkSizePercentage, initialCountPercentage, heapMemoryManager);
    }

    @VisibleForTesting
    private void initializePools(int chunkSize, long globalMemStoreSize, float poolSizePercentage, float indexChunkSizePercentage,
            float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
        this.dataChunksPool = initializePool("data", globalMemStoreSize, (1 - indexChunkSizePercentage) * poolSizePercentage, initialCountPercentage,
                chunkSize, heapMemoryManager);
        // The index chunks pool is needed only when the index type is CCM.
        // Since the pools are not created at all when the index type isn't CCM,
        // we don't need to check it here.
        this.indexChunksPool = initializePool("index", globalMemStoreSize, indexChunkSizePercentage * poolSizePercentage, initialCountPercentage,
                (int) (indexChunkSizePercentage * chunkSize), heapMemoryManager);
    }

    /**
     * Initializes the instance of ChunkCreator
     *
     * @param chunkSize              the chunkSize
     * @param offheap                indicates if the chunk is to be created offheap or not
     * @param globalMemStoreSize     the global memstore size
     * @param poolSizePercentage     pool size percentage
     * @param initialCountPercentage the initial count of the chunk pool if any
     * @param heapMemoryManager      the heapmemory manager
     * @return singleton MSLABChunkCreator
     */
    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", justification = "Method is called by single thread at the starting of RS")
    @VisibleForTesting
    public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage,
            float initialCountPercentage, HeapMemoryManager heapMemoryManager) {
        if(instance != null) {
            return instance;
        }
        instance = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, initialCountPercentage, heapMemoryManager,
                MemStoreLABImpl.INDEX_CHUNK_PERCENTAGE_DEFAULT);
        return instance;
    }

    @VisibleForTesting
    public static ChunkCreator getInstance() {
        return instance;
    }

    /**
     * Creates and inits a chunk. The default implementation for a specific chunk size.
     *
     * @return the chunk that was initialized
     */
    Chunk getChunk(ChunkType chunkType) {
        return getChunk(CompactingMemStore.IndexType.ARRAY_MAP, chunkType);
    }

    /**
     * Creates and inits a chunk. The default implementation.
     *
     * @return the chunk that was initialized
     */
    Chunk getChunk() {
        return getChunk(CompactingMemStore.IndexType.ARRAY_MAP, ChunkType.DATA_CHUNK);
    }

    /**
     * Creates and inits a chunk. The default implementation for a specific index type.
     *
     * @return the chunk that was initialized
     */
    Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
        return getChunk(chunkIndexType, ChunkType.DATA_CHUNK);
    }

    /**
     * Creates and inits a chunk with specific index type and type.
     *
     * @return the chunk that was initialized
     */
    Chunk getChunk(CompactingMemStore.IndexType chunkIndexType, ChunkType chunkType) {
        switch(chunkType) {
            case INDEX_CHUNK:
                if(indexChunksPool != null) {
                    return getChunk(chunkIndexType, indexChunksPool.getChunkSize());
                }
            case DATA_CHUNK:
                if(dataChunksPool == null) {
                    return getChunk(chunkIndexType, chunkSize);
                } else {
                    return getChunk(chunkIndexType, dataChunksPool.getChunkSize());
                }
            default:
                throw new IllegalArgumentException("chunkType must either be INDEX_CHUNK or DATA_CHUNK");
        }
    }

    /**
     * Creates and inits a chunk.
     *
     * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
     * @param size           the size of the chunk to be allocated, in bytes
     * @return the chunk that was initialized
     */
    Chunk getChunk(CompactingMemStore.IndexType chunkIndexType, int size) {
        Chunk chunk = null;
        MemStoreChunkPool pool = null;

        // if the size is suitable for one of the pools
        if(dataChunksPool != null && size == dataChunksPool.getChunkSize()) {
            pool = dataChunksPool;
        } else if(indexChunksPool != null && size == indexChunksPool.getChunkSize()) {
            pool = indexChunksPool;
        }

        // if we have a pool
        if(pool != null) {
            //  the pool creates the chunk internally. The chunk#init() call happens here
            chunk = pool.getChunk();
            // the pool has run out of maxCount
            if(chunk == null) {
                if(LOG.isTraceEnabled()) {
                    LOG.trace("The chunk pool is full. Reached maxCount= " + pool.getMaxCount() + ". Creating chunk onheap.");
                }
            }
        }

        if(chunk == null) {
            // the second parameter explains whether CellChunkMap index is requested,
            // in that case, put allocated on demand chunk mapping into chunkIdMap
            chunk = createChunk(false, chunkIndexType, size);
        }

        // now we need to actually do the expensive memory allocation step in case of a new chunk,
        // else only the offset is set to the beginning of the chunk to accept allocations
        chunk.init();
        return chunk;
    }

    /**
     * Creates and inits a chunk of a special size, bigger than a regular chunk size.
     * Such a chunk will never come from pool and will always be on demand allocated.
     *
     * @param jumboSize the special size to be used
     * @return the chunk that was initialized
     */
    Chunk getJumboChunk(int jumboSize) {
        int allocSize = jumboSize + SIZEOF_CHUNK_HEADER;
        if(allocSize <= dataChunksPool.getChunkSize()) {
            LOG.warn("Jumbo chunk size " + jumboSize + " must be more than regular chunk size " + dataChunksPool
                    .getChunkSize() + ". Converting to regular chunk.");
            return getChunk(CompactingMemStore.IndexType.CHUNK_MAP);
        }
        // the new chunk is going to hold the jumbo cell data and needs to be referenced by
        // a strong map. Therefore the CCM index type
        return getChunk(CompactingMemStore.IndexType.CHUNK_MAP, allocSize);
    }

    /**
     * Creates the chunk either onheap or offheap
     *
     * @param pool           indicates if the chunks have to be created which will be used by the Pool
     * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
     * @param size           the size of the chunk to be allocated, in bytes
     * @return the chunk
     */
    private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType, int size) {
        Chunk chunk = null;
        int id = chunkID.getAndIncrement();
        assert id > 0;
        // do not create offheap chunk on demand
        if(pool && this.offheap) {
            chunk = new OffheapChunk(size, id, pool);
        } else {
            chunk = new OnheapChunk(size, id, pool);
        }
        if(pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) {
            // put the pool chunk into the chunkIdMap so it is not GC-ed
            this.chunkIdMap.put(chunk.getId(), chunk);
        }
        return chunk;
    }

    // Chunks from pool are created covered with strong references anyway
    // TODO: change to CHUNK_MAP if it is generally defined
    private Chunk createChunkForPool(CompactingMemStore.IndexType chunkIndexType, int chunkSize) {
        if(chunkSize != dataChunksPool.getChunkSize() && chunkSize != indexChunksPool.getChunkSize()) {
            return null;
        }
        return createChunk(true, chunkIndexType, chunkSize);
    }

    @VisibleForTesting
        // Used to translate the ChunkID into a chunk ref
    Chunk getChunk(int id) {
        // can return null if chunk was never mapped
        return chunkIdMap.get(id);
    }

    boolean isOffheap() {
        return this.offheap;
    }

    private void removeChunks(Set<Integer> chunkIDs) {
        this.chunkIdMap.keySet().removeAll(chunkIDs);
    }

    Chunk removeChunk(int chunkId) {
        return this.chunkIdMap.remove(chunkId);
    }

    @VisibleForTesting
        // the chunks in the chunkIdMap may already be released so we shouldn't relay
        // on this counting for strong correctness. This method is used only in testing.
    int numberOfMappedChunks() {
        return this.chunkIdMap.size();
    }

    @VisibleForTesting
    void clearChunkIds() {
        this.chunkIdMap.clear();
    }

    /**
     * A pool of {@link Chunk} instances.
     *
     * MemStoreChunkPool caches a number of retired chunks for reusing, it could
     * decrease allocating bytes when writing, thereby optimizing the garbage collection on JVM.
     */
    private class MemStoreChunkPool implements HeapMemoryTuneObserver {
        private final int chunkSize;
        private int maxCount;

        // A queue of reclaimed chunks
        private final BlockingQueue<Chunk> reclaimedChunks;
        private final float poolSizePercentage;

        /**
         * Statistics thread schedule pool
         */
        private final ScheduledExecutorService scheduleThreadPool;
        /**
         * Statistics thread
         */
        private static final int statThreadPeriod = 60 * 5;
        private final AtomicLong chunkCount = new AtomicLong();
        private final LongAdder reusedChunkCount = new LongAdder();
        private final String label;

        MemStoreChunkPool(String label, int chunkSize, int maxCount, int initialCount, float poolSizePercentage) {
            this.label = label;
            this.chunkSize = chunkSize;
            this.maxCount = maxCount;
            this.poolSizePercentage = poolSizePercentage;
            this.reclaimedChunks = new LinkedBlockingQueue<>();
            for(int i = 0; i < initialCount; i++) {
                Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP, chunkSize);
                chunk.init();
                reclaimedChunks.add(chunk);
            }
            chunkCount.set(initialCount);
            final String n = Thread.currentThread().getName();
            scheduleThreadPool = Executors
                    .newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
            this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
        }

        /**
         * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
         * not yet created max allowed chunks count. When we have already created max allowed chunks and
         * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
         * then.
         * Note: Chunks returned by this pool must be put back to the pool after its use.
         *
         * @return a chunk
         * @see #putbackChunks(Chunk)
         */
        Chunk getChunk() {
            return getChunk(CompactingMemStore.IndexType.ARRAY_MAP);
        }

        Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
            Chunk chunk = reclaimedChunks.poll();
            if(chunk != null) {
                chunk.reset();
                reusedChunkCount.increment();
            } else {
                // Make a chunk iff we have not yet created the maxCount chunks
                while(true) {
                    long created = this.chunkCount.get();
                    if(created < this.maxCount) {
                        if(this.chunkCount.compareAndSet(created, created + 1)) {
                            chunk = createChunkForPool(chunkIndexType, chunkSize);
                            break;
                        }
                    } else {
                        break;
                    }
                }
            }
            return chunk;
        }

        int getChunkSize() {
            return chunkSize;
        }

        /**
         * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
         * chunks
         *
         * @param c
         */
        private void putbackChunks(Chunk c) {
            int toAdd = this.maxCount - reclaimedChunks.size();
            if(c.isFromPool() && c.size == chunkSize && toAdd > 0) {
                reclaimedChunks.add(c);
            } else {
                // remove the chunk (that is not going to pool)
                // though it is initially from the pool or not
                ChunkCreator.this.removeChunk(c.getId());
            }
        }

        private class StatisticsThread extends Thread {
            StatisticsThread() {
                super("MemStoreChunkPool.StatisticsThread");
                setDaemon(true);
            }

            @Override
            public void run() {
                logStats();
            }

            private void logStats() {
                if(!LOG.isDebugEnabled())
                    return;
                long created = chunkCount.get();
                long reused = reusedChunkCount.sum();
                long total = created + reused;
                LOG.debug("{} stats (chunk size={}): current pool size={}, created chunk count={}, " + "reused chunk count={}, reuseRatio={}", label,
                        chunkSize, reclaimedChunks.size(), created, reused,
                        (total == 0 ? "0" : StringUtils.formatPercent((float) reused / (float) total, 2)));
            }
        }

        private int getMaxCount() {
            return this.maxCount;
        }

        @Override
        public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
            // don't do any tuning in case of offheap memstore
            if(isOffheap()) {
                LOG.warn("{} not tuning the chunk pool as it is offheap", label);
                return;
            }
            int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / getChunkSize());
            if(newMaxCount != this.maxCount) {
                // We need an adjustment in the chunks numbers
                if(newMaxCount > this.maxCount) {
                    // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
                    // create and add them to Q
                    LOG.info("{} max count for chunks increased from {} to {}", this.label, this.maxCount, newMaxCount);
                    this.maxCount = newMaxCount;
                } else {
                    // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
                    // itself. If the extra chunks are serving already, do not pool those when we get them back
                    LOG.info("{} max count for chunks decreased from {} to {}", this.label, this.maxCount, newMaxCount);
                    this.maxCount = newMaxCount;
                    if(this.reclaimedChunks.size() > newMaxCount) {
                        synchronized(this) {
                            while(this.reclaimedChunks.size() > newMaxCount) {
                                this.reclaimedChunks.poll();
                            }
                        }
                    }
                }
            }
        }
    }

    @VisibleForTesting
    static void clearDisableFlag() {
        chunkPoolDisabled = false;
    }

    private MemStoreChunkPool initializePool(String label, long globalMemStoreSize, float poolSizePercentage, float initialCountPercentage,
            int chunkSize, HeapMemoryManager heapMemoryManager) {
        if(poolSizePercentage <= 0) {
            LOG.info("{} poolSizePercentage is less than 0. So not using pool", label);
            return null;
        }
        if(chunkPoolDisabled) {
            return null;
        }
        if(poolSizePercentage > 1.0) {
            throw new IllegalArgumentException(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
        }
        int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
        if(initialCountPercentage > 1.0 || initialCountPercentage < 0) {
            throw new IllegalArgumentException(label + " " + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
        }
        int initialCount = (int) (initialCountPercentage * maxCount);
        LOG.info("Allocating {} MemStoreChunkPool with chunk size {}, max count {}, initial count {}", label, StringUtils.byteDesc(chunkSize),
                maxCount, initialCount);
        MemStoreChunkPool memStoreChunkPool = new MemStoreChunkPool(label, chunkSize, maxCount, initialCount, poolSizePercentage);
        if(heapMemoryManager != null && memStoreChunkPool != null) {
            // Register with Heap Memory manager
            heapMemoryManager.registerTuneObserver(memStoreChunkPool);
        }
        return memStoreChunkPool;
    }

    @VisibleForTesting
    int getMaxCount() {
        return getMaxCount(ChunkType.DATA_CHUNK);
    }

    @VisibleForTesting
    int getMaxCount(ChunkType chunkType) {
        switch(chunkType) {
            case INDEX_CHUNK:
                if(indexChunksPool != null) {
                    return indexChunksPool.getMaxCount();
                }
                break;
            case DATA_CHUNK:
                if(dataChunksPool != null) {
                    return dataChunksPool.getMaxCount();
                }
                break;
            default:
                throw new IllegalArgumentException("chunkType must either be INDEX_CHUNK or DATA_CHUNK");
        }

        return 0;
    }

    @VisibleForTesting
    int getPoolSize() {
        return getPoolSize(ChunkType.DATA_CHUNK);
    }

    @VisibleForTesting
    int getPoolSize(ChunkType chunkType) {
        switch(chunkType) {
            case INDEX_CHUNK:
                if(indexChunksPool != null) {
                    return indexChunksPool.reclaimedChunks.size();
                }
                break;
            case DATA_CHUNK:
                if(dataChunksPool != null) {
                    return dataChunksPool.reclaimedChunks.size();
                }
                break;
            default:
                throw new IllegalArgumentException("chunkType must either be INDEX_CHUNK or DATA_CHUNK");
        }
        return 0;
    }

    @VisibleForTesting
    boolean isChunkInPool(int chunkId) {
        Chunk c = getChunk(chunkId);
        if(c == null) {
            return false;
        }

        // chunks that are from pool will return true chunk reference not null
        if(dataChunksPool != null && dataChunksPool.reclaimedChunks.contains(c)) {
            return true;
        } else if(indexChunksPool != null && indexChunksPool.reclaimedChunks.contains(c)) {
            return true;
        }
        return false;
    }

    /*
     * Only used in testing
     */
    @VisibleForTesting
    void clearChunksInPool() {
        if(dataChunksPool != null) {
            dataChunksPool.reclaimedChunks.clear();
        }
        if(indexChunksPool != null) {
            indexChunksPool.reclaimedChunks.clear();
        }
    }

    int getChunkSize() {
        return getChunkSize(ChunkType.DATA_CHUNK);
    }

    int getChunkSize(ChunkType chunkType) {
        switch(chunkType) {
            case INDEX_CHUNK:
                if(indexChunksPool != null) {
                    return indexChunksPool.getChunkSize();
                }
            case DATA_CHUNK:
                if(dataChunksPool != null) {
                    return dataChunksPool.getChunkSize();
                } else { // When pools are empty
                    return chunkSize;
                }
            default:
                throw new IllegalArgumentException("chunkType must either be INDEX_CHUNK or DATA_CHUNK");
        }
    }

    synchronized void putbackChunks(Set<Integer> chunks) {
        // if there is no pool just try to clear the chunkIdMap in case there is something
        if(dataChunksPool == null && indexChunksPool == null) {
            this.removeChunks(chunks);
            return;
        }

        // if there is a pool, go over all chunk IDs that came back, the chunks may be from pool or not
        for(int chunkID : chunks) {
            // translate chunk ID to chunk, if chunk initially wasn't in pool
            // this translation will (most likely) return null
            Chunk chunk = ChunkCreator.this.getChunk(chunkID);
            if(chunk != null) {
                if(chunk.isFromPool() && chunk.isIndexChunk()) {
                    indexChunksPool.putbackChunks(chunk);
                } else if(chunk.isFromPool() && chunk.size == dataChunksPool.getChunkSize()) {
                    dataChunksPool.putbackChunks(chunk);
                } else {
                    // chunks which are not from one of the pools
                    // should be released without going to the pools.
                    // Removing them from chunkIdMap will cause their removal by the GC.
                    this.removeChunk(chunkID);
                }
            }
            // if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also),
            // so we have nothing to do on its release
        }
        return;
    }

}

